package com.tpvlog.dfs.namenode.server;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tpvlog.dfs.namenode.file.FSNameSystem;
import com.tpvlog.dfs.namenode.log.EditLogReplicator;
import com.tpvlog.dfs.namenode.register.*;
import com.tpvlog.dfs.rpc.service.*;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.List;

/**
 * NameNode的RPC服务接口
 *
 * @author Ressmix
 */
public class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
    // 成功
    public static final Integer STATUS_SUCCESS = 1;
    // 失败
    public static final Integer STATUS_FAILURE = 2;
    // 已停机
    public static final Integer STATUS_SHUTDOWN = 3;

    // 负责管理元数据的核心组件
    private FSNameSystem namesystem;

    // 负责管理DataNode的核心组件
    private DataNodeManager datanodeManager;

    // 负责日志同步的核心组件
    private EditLogReplicator replicator;

    private volatile Boolean isRunning = true;

    public NameNodeServiceImpl(FSNameSystem namesystem, DataNodeManager datanodeManager, EditLogReplicator replicator) {
        this.namesystem = namesystem;
        this.datanodeManager = datanodeManager;
        this.replicator = replicator;
    }

    /**
     * DataNode注册
     */
    @Override
    public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
        // 使用DataNodeManager组件完成DataNode注册
        boolean succ = datanodeManager.register(request.getIp(), request.getHostname(), request.getNioPort());

        RegisterResponse response = null;
        if (succ) {
            response = RegisterResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
        } else {
            response = RegisterResponse.newBuilder().setStatus(STATUS_FAILURE).build();
        }
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    /**
     * DataNode心跳
     */
    @Override
    public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
        // 使用DataNodeManager组件完成DataNode心跳
        boolean succ = datanodeManager.heartbeat(request.getIp(), request.getHostname(), request.getNioPort());
        HeartbeatResponse response = null;
        List<Command> cmdList = new ArrayList<>();
        if (succ) {
            // 1.判断当前目标节点是否包含副本复制任务
            DataNodeInfo datanode = datanodeManager.getDataNodeInfo(request.getIp(), request.getHostname());
            ReplicateTask replicateTask = null;
            while ((replicateTask = datanode.pollReplicateTask()) != null) {
                Command command = new Command(Command.REPLICATE);
                command.setContent(JSONObject.toJSONString(replicateTask));
                cmdList.add(command);
            }
            // 2.判断当前目标节点是否包含副本删除任务
            RemoveReplicaTask removeReplicaTask = null;
            while ((removeReplicaTask = datanode.pollRemoveReplicaTask()) != null) {
                Command removeReplicaCommand = new Command(Command.REMOVE_REPLICA);
                removeReplicaCommand.setContent(JSONObject.toJSONString(removeReplicaTask));
                cmdList.add(removeReplicaCommand);
            }
            System.out.println("接收到数据节点【" + datanode + "】的心跳，他的命令列表为：" + cmdList);
            response = HeartbeatResponse.newBuilder()
                    .setStatus(STATUS_SUCCESS)
                    .setCommands(JSONArray.toJSONString(cmdList))
                    .build();
        } else {
            Command regCmd = new Command(Command.REGISTER);
            Command fullReportCmd = new Command(Command.REPORT_COMPLETE_STORAGE_INFO);
            cmdList.add(regCmd);
            cmdList.add(fullReportCmd);
            response = HeartbeatResponse.newBuilder()
                    .setStatus(STATUS_FAILURE)
                    .setCommands(JSONArray.toJSONString(cmdList))
                    .build();
        }
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    /**
     * 创建目录
     */
    @Override
    public void mkdir(MkDirRequest request, StreamObserver<MkDirResponse> responseObserver) {
        try {
            MkDirResponse response = null;
            if (!isRunning) {
                response = MkDirResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                this.namesystem.mkdir(request.getPath());
                response = MkDirResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建文件
     */
    @Override
    public void createFile(CreateFileRequest request, StreamObserver<CreateFileResponse> responseObserver) {
        try {
            CreateFileResponse response = null;
            if (!isRunning) {
                response = CreateFileResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                String filename = request.getFilename();
                Boolean success = namesystem.createFile(filename);
                if (success) {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
                } else {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_FAILURE).build();
                }
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取可供上传的DataNode节点及副本
     */
    @Override
    public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
        try {
            AllocateDataNodesResponse response = null;
            if (!isRunning) {
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                long fileSize = request.getFilesize();
                String excludedDataNodeId = request.getExcludedDataNodeId();
                List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize, excludedDataNodeId);
                String datanodesJson = JSONArray.toJSONString(datanodes);
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 下载文件
     *
     * @param request
     * @param responseObserver
     */
    @Override
    public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
        String filename = request.getFilename();
        String excludedDataNode = request.getExcludedDataNodeId();
        DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename, excludedDataNode);

        GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
                .setDatanode(JSONObject.toJSONString(datanode))
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }


    /**
     * 优雅停机
     */
    @Override
    public void shutdown(ShutdownRequest request, StreamObserver<ShutdownResponse> responseObserver) {
        isRunning = false;
        namesystem.flush();
        namesystem.saveCheckpointTxid();
        System.out.println("优雅关闭namenode......");
    }

    /**
     * Edits Log日志同步
     */
    @Override
    public void fetchEditsLog(FetchEditsLogRequest request, StreamObserver<FetchEditsLogResponse> responseObserver) {
        if (!isRunning) {
            FetchEditsLogResponse response = FetchEditsLogResponse.newBuilder()
                    .setEditsLog(new JSONArray().toJSONString())
                    .build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
            return;
        }

        // 委托组件EditLogReplicator完成Edits Log日志拉取
        long syncedTxid = request.getSyncedTxid();
        List<String> list = replicator.fetchEditsLog(syncedTxid);
        String result = JSONObject.toJSONString(list);
        FetchEditsLogResponse response = FetchEditsLogResponse.newBuilder().setEditsLog(result).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    /**
     * 更新checkpoint信息
     */
    @Override
    public void updateCheckpointTxid(UpdateCheckpointTxidRequest request,
                                     StreamObserver<UpdateCheckpointTxidResponse> responseObserver) {
        long txid = request.getTxid();
        namesystem.setCheckpointTxid(txid);
        UpdateCheckpointTxidResponse response = UpdateCheckpointTxidResponse.newBuilder().setStatus(1).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }


    /**
     * 全量上报DataNode信息
     */
    @Override
    public void fullyReportDataNodeInfo(FullyReportRequest request, StreamObserver<FullyReportResponse> responseObserver) {
        String ip = request.getIp();
        String hostname = request.getHostname();
        String filenamesJson = request.getFilenameList();
        Long storedDataSize = request.getStoredDataSize();
        JSONArray filenames = JSONArray.parseArray(filenamesJson);
        List<String> filenameList = new ArrayList<>();
        for (int i = 0; i < filenames.size(); i++) {
            String filename = filenames.getString(i);
            filenameList.add(filename);
        }
        datanodeManager.fullyReportDataNodeInfo(ip, hostname, filenameList, storedDataSize);
        FullyReportResponse response = FullyReportResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    /**
     * 增量上报DataNode信息
     */
    @Override
    public void deltaReportDataNodeInfo(DeltaReportRequest request, StreamObserver<DeltaReportResponse> responseObserver) {
        String hostname = request.getHostname();
        String ip = request.getIp();
        String filename = request.getFilename();
        Long filesize = request.getFilesize();
        DeltaReportResponse response = null;
        try {
            datanodeManager.deltaReportDataNodeInfo(ip, hostname, filename, filesize);
            response = DeltaReportResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
        } catch (Exception e) {
            e.printStackTrace();
            response = DeltaReportResponse.newBuilder().setStatus(STATUS_FAILURE).build();
        }
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    /**
     * DataNode集群Rebalance
     *
     * @param request
     * @param responseObserver
     */
    @Override
    public void rebalance(RebalanceRequest request, StreamObserver<RebalanceResponse> responseObserver) {
        RebalanceResponse response = null;
        try {
            datanodeManager.rebalance();
            response = RebalanceResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
        } catch (Exception e) {
            e.printStackTrace();
            response = RebalanceResponse.newBuilder().setStatus(STATUS_FAILURE).build();
        }
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}
